Skip to content

Conversation

@mohammedabdulwahhab
Copy link
Contributor

@mohammedabdulwahhab mohammedabdulwahhab commented Nov 5, 2025

  • adds kubernetes backed discovery

Summary by CodeRabbit

  • New Features

    • Added Kubernetes-based service discovery capability for dynamic model and endpoint registration.
    • Introduced local testing mode for Kubernetes integration with configurable metadata servers.
  • Bug Fixes

    • Fixed undefined import reference in integration module.
  • Documentation

    • Added comprehensive Kubernetes discovery integration testing guides and local testing workflow documentation.
  • Tests

    • Added new Kubernetes discovery client integration tests and helper scripts for deployment, cleanup, and test pod creation.
  • Chores

    • Updated git ignore configuration.

@mohammedabdulwahhab mohammedabdulwahhab requested review from a team as code owners November 5, 2025 22:51
@copy-pr-bot
Copy link

copy-pr-bot bot commented Nov 5, 2025

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@github-actions github-actions bot added the fix label Nov 5, 2025
@mohammedabdulwahhab mohammedabdulwahhab marked this pull request as draft November 5, 2025 22:52
@mohammedabdulwahhab mohammedabdulwahhab changed the title fix: add kube impl for discovery fix: add kube impl for discovery and add metadata endpoint Nov 5, 2025
@mohammedabdulwahhab mohammedabdulwahhab changed the base branch from mabdulwahhab/modify-call-sites to main November 6, 2025 22:49
@mohammedabdulwahhab mohammedabdulwahhab marked this pull request as ready for review November 6, 2025 22:53
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 6, 2025

Walkthrough

This pull request replaces etcd-based service discovery with a Kubernetes-native discovery client, introducing a new discovery architecture with support for both Kubernetes (via EndpointSlices) and KV store backends. It includes comprehensive Kubernetes testing infrastructure, public API updates for discovery interfaces, and integration across multiple service components.

Changes

Cohort / File(s) Summary
Discovery Core Infrastructure
lib/runtime/src/discovery/mod.rs, lib/runtime/src/discovery/utils.rs
Expanded DiscoveryKey, DiscoverySpec, and DiscoveryInstance enums to support ModelCard variants and structured discovery. Added list() method to DiscoveryClient trait. Introduced watch_and_extract_field utility for streaming field extraction from discovery events.
Kubernetes Discovery Client
lib/runtime/src/discovery/kube.rs
New comprehensive Kubernetes-based discovery client implementing DiscoveryClient with EndpointSlice handling, pod metadata caching, HTTP-based remote metadata fetching, and streaming watch mechanism with Added/Removed event emission.
KV Store Discovery Client
lib/runtime/src/discovery/kv_store.rs
New KVStoreDiscoveryClient implementing DiscoveryClient backed by KeyValueStoreManager for backward compatibility, supporting registration, listing, and watching of Endpoint and ModelCard instances.
Discovery Backend Selection & Runtime Integration
lib/runtime/src/distributed.rs, lib/runtime/src/lib.rs, lib/runtime/src/system_status_server.rs
Dynamic discovery backend selection (Kubernetes or KV store) via DYN_DISCOVERY_BACKEND environment variable. Added discovery_metadata field to DistributedRuntime and SystemStatusState. New /metadata HTTP endpoint on system status server. Changed connection_id() to use discovery_client.instance_id().
Component Discovery
lib/runtime/src/component.rs
Added Hash derive to TransportType. Reworked Component::list_instances to use discovery_client.list() instead of etcd-based storage.
Component Client Dynamic Discovery
lib/runtime/src/component/client.rs
Extensive refactoring from etcd watcher to discovery stream-based endpoint discovery. Replaced kv_get_and_watch_prefix with discovery list_and_watch, added comprehensive logging instrumentation at key control-flow points.
Endpoint Registration
lib/runtime/src/component/endpoint.rs
Replaced etcd-based service registration with discovery_client.register() using DiscoverySpec. Removed etcd-specific key/JSON serialization logic.
Instance Listing Utility
lib/runtime/src/instances.rs
Public function list_all_instances signature changed from KeyValueStoreManager reference to Arc.
Storage KV Watch Enhancement
lib/runtime/src/storage/key_value_store.rs
Added pre-watch phase to fetch existing entries as Put events, deduplication mechanism for Put events, and detailed tracing logs throughout watch lifecycle.
LLM Model Watcher
lib/llm/src/discovery/watcher.rs
Replaced Receiver input with DiscoveryStream. Reworked event handling to support DiscoveryEvent::Added(model_card) and Removed variants with deserialization and namespace filtering. Changed key management from string-based to hex-formatted instance_id.
LLM Worker Monitor
lib/llm/src/discovery/worker_monitor.rs
Switched runtime config watching from etcd to discovery using list_and_watch(AllModelCards) and watch_and_extract_field for runtime_config extraction. Added verbose logging for config updates.
LLM HTTP Service Integration
lib/llm/src/http/service/service_v2.rs, lib/llm/src/http/service/health.rs, lib/llm/src/http/service/clear_kv_blocks.rs
Added discovery_client field and accessor to State. Updated health_handler and clear_kv_blocks_handler to use discovery_client instead of store for instance discovery.
LLM KV Router
lib/llm/src/kv_router.rs, lib/llm/src/kv_router/scheduler.rs, lib/llm/src/kv_router/subscriber.rs
Replaced etcd-based runtime config watcher with discovery-driven approach using watch_and_extract_field. Updated subscriber to use discovery list_and_watch for generate endpoint instances. Added scheduler validation logging.
LLM Entrypoint Integration
lib/llm/src/entrypoint/input/common.rs, lib/llm/src/entrypoint/input/grpc.rs, lib/llm/src/entrypoint/input/http.rs
Replaced store-based model card watching with discovery_client.list_and_watch(AllModelCards). Updated function signatures to remove store parameter. Added HTTP endpoint enable/disable helpers for model lifecycle management.
LLM Local Model
lib/llm/src/local_model.rs
Replaced KV store-based card Publish with discovery_client.register() using DiscoverySpec.
LLM Tests
lib/llm/tests/http_metrics.rs
Replaced etcd-based model registration watcher with discovery stream-based watcher.
VLM Component
components/src/dynamo/vllm/main.py
Commented out import of get_consolidator_endpoints (leaves code referencing undefined symbol at runtime).
Kubernetes Test Infrastructure
k8s-test/deploy.sh, k8s-test/cleanup.sh, k8s-test/run-tests.sh, k8s-test/create-local-test-pod.sh, k8s-test/manifests/test-deployment.yaml
New comprehensive test infrastructure: deployment automation, cleanup scripts, test orchestration supporting multiple test suites (raw Kubernetes API, KubeDiscoveryClient), local testing pod creation with metadata server support.
Kubernetes Test Documentation
k8s-test/README.md, k8s-test/LOCAL_TESTING.md
New comprehensive documentation for Kubernetes discovery integration testing, local testing mode with DYN_LOCAL_KUBE_TEST, architecture overview, troubleshooting, and end-to-end workflows.
Kubernetes Integration Tests
lib/runtime/tests/kube_client_integration.rs, lib/runtime/tests/kube_discovery_integration.rs
New integration test modules validating KubeDiscoveryClient creation, listing endpoints by various keys, and watching EndpointSlices with event validation.
Runtime Dependencies
lib/runtime/Cargo.toml
Added workspace dependency reqwest; added Kubernetes stack (kube, k8s-openapi, schemars) with features; extended dev-dependencies with testing utilities (env_logger, rstest, temp-env, stdio-override, jsonschema, tempfile).
Git Configuration
.gitignore
Added rebuild.sh to ignored files under Direnv section.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~90 minutes

Areas requiring extra attention:

  • lib/runtime/src/discovery/kube.rs: Large new file (~500+ lines) with complex Kubernetes API integration, pod metadata caching, HTTP fetching, and stream handling. Verify correctness of EndpointSlice parsing, pod name hashing, and cache invalidation logic.
  • lib/runtime/src/discovery/mod.rs: Significant public API surface expansion. Review all new DiscoveryKey variants, DiscoverySpec/DiscoveryInstance enum changes, and trait method additions for backward compatibility.
  • lib/runtime/src/component/client.rs: Comprehensive refactoring from etcd to discovery streaming. Verify stream lifecycle management, error handling paths, and the monitor loop logic with new instance tracking.
  • Discovery stream integration across LLM services: Multiple files (watcher.rs, worker_monitor.rs, kv_router.rs, subscriber.rs) adopt streaming patterns. Verify consistent error handling and proper stream cleanup.
  • lib/runtime/src/discovery/kube.rs tests: New integration tests in kube_client_integration.rs and kube_discovery_integration.rs are marked #[ignore] requiring manual verification in a real cluster.
  • Public API changes: Verify all callers of list_all_instances(), discovery_client(), and spawn_system_status_server() are updated correctly with new signatures.
  • components/src/dynamo/vllm/main.py: Commented-out import leaves get_consolidator_endpoints undefined at runtime. Confirm this is intentional or update as needed.
  • Environment variable-driven backend selection: Review the DYN_DISCOVERY_BACKEND logic and fallback behavior in distributed.rs for robustness.

Poem

🐰 The discovery hops through Kubernetes skies,
Watching EndpointSlices with streaming eyes,
From storage to service, the data now flows,
A new discovery path that elegantly grows! ✨

Pre-merge checks

❌ Failed checks (1 inconclusive)
Check name Status Explanation Resolution
Description check ❓ Inconclusive The description is minimal and lacks required details. It only states high-level objectives without explaining implementation details, testing approach, or specific file changes. Expand the description to include: what problems this solves, specific implementation details, testing approach, and any breaking changes or migration notes.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main changes: adding Kubernetes implementation for discovery and a metadata endpoint.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
lib/runtime/src/component/endpoint.rs (1)

194-225: Keep the discovery registration alive

DiscoveryClient::register hands back an RAII registration guard whose Drop removes the entry. Because we don’t bind the returned guard, it’s dropped immediately, so nothing ever shows up in discovery even though we logged success. Hold the guard for the lifetime of the endpoint task (e.g., bind it to _discovery_guard) and only let it drop when we’re shutting down.

-        if let Err(e) = discovery_client.register(discovery_spec).await {
-            tracing::error!(
-                component_name,
-                endpoint_name,
-                error = %e,
-                "Unable to register service for discovery"
-            );
-            endpoint_shutdown_token.cancel();
-            return Err(error!(
-                "Unable to register service for discovery. Check discovery service status"
-            ));
-        }
+        let _discovery_guard = match discovery_client.register(discovery_spec).await {
+            Ok(guard) => guard,
+            Err(e) => {
+                tracing::error!(
+                    component_name,
+                    endpoint_name,
+                    error = %e,
+                    "Unable to register service for discovery"
+                );
+                endpoint_shutdown_token.cancel();
+                return Err(error!(
+                    "Unable to register service for discovery. Check discovery service status"
+                ));
+            }
+        };
lib/runtime/src/discovery/mock.rs (1)

124-166: Discovery mock never emits updates for modified instances

Line 145 currently only inserts the instance_id into known_instances. Once an id is present, any later change to that instance’s payload is ignored because the HashSet can’t detect data mutations. The real discovery backends re-emit an Added event when the resource changes, so the mock now diverges: tests that rely on watching for config updates will never see them.

Please track the full instance alongside its id and compare the payload before deciding whether to skip the event.

-        use std::collections::HashSet;
+        use std::collections::HashMap;-            let mut known_instances = HashSet::new();
+            let mut known_instances: HashMap<u64, DiscoveryInstance> = HashMap::new();-                let current: Vec<_> = {
+                let current: Vec<_> = {
                     let instances = registry.instances.lock().unwrap();
                     instances
                         .iter()
                         .filter(|instance| matches_key(instance, &key))
                         .cloned()
                         .collect()
                 };
 
-                let current_ids: HashSet<_> = current.iter().map(|i| {
-                    match i {
-                        DiscoveryInstance::Endpoint(inst) => inst.instance_id,
-                        DiscoveryInstance::ModelCard { instance_id, .. } => *instance_id,
-                    }
-                }).collect();
+                let current_map: HashMap<u64, DiscoveryInstance> = current
+                    .iter()
+                    .map(|instance| {
+                        let id = match instance {
+                            DiscoveryInstance::Endpoint(inst) => inst.instance_id,
+                            DiscoveryInstance::ModelCard { instance_id, .. } => *instance_id,
+                        };
+                        (id, instance.clone())
+                    })
+                    .collect();
 
                 // Emit Added events for new instances
-                for instance in current {
-                    let id = match &instance {
-                        DiscoveryInstance::Endpoint(inst) => inst.instance_id,
-                        DiscoveryInstance::ModelCard { instance_id, .. } => *instance_id,
-                    };
-                    if known_instances.insert(id) {
-                        yield Ok(DiscoveryEvent::Added(instance));
-                    }
+                for (id, instance) in current_map.iter() {
+                    match known_instances.get(id) {
+                        Some(previous) if previous == instance => {}
+                        _ => {
+                            known_instances.insert(*id, instance.clone());
+                            yield Ok(DiscoveryEvent::Added(instance.clone()));
+                        }
+                    }
                 }
 
                 // Emit Removed events for instances that are gone
-                for id in known_instances.difference(&current_ids).cloned().collect::<Vec<_>>() {
-                    yield Ok(DiscoveryEvent::Removed(id));
-                    known_instances.remove(&id);
+                for id in known_instances
+                    .keys()
+                    .filter(|id| !current_map.contains_key(id))
+                    .copied()
+                    .collect::<Vec<_>>()
+                {
+                    known_instances.remove(&id);
+                    yield Ok(DiscoveryEvent::Removed(id));
                 }
lib/llm/src/discovery/watcher.rs (1)

181-209: Do not collapse model-card keys to instance_id-only

A single pod can publish multiple DiscoveryInstance::ModelCards (e.g., generate + prefill endpoints share the same hashed instance_id). By switching the manager key to just format!("{:x}", instance_id), every additional card from the same pod overwrites the previous entry. When that pod is later removed, handle_delete only sees the last card, so the earlier models remain registered forever and never get cleaned up. You can observe the multi-card emission in KubeDiscoveryClient::filter_metadata, which yields one DiscoveryInstance::ModelCard per advertised model for the same instance_id. Please keep the key composite (namespace/component/endpoint + instance_id) like the legacy etcd path, e.g.:

- let key = format!("{:x}", instance_id);
+ let key = format!(
+     "{}/{}/{}/{:x}",
+     endpoint_id.namespace, endpoint_id.component, endpoint_id.name, instance_id
+ );

and mirror the same format in the delete path so removal evicts every model registered by that worker.

🧹 Nitpick comments (9)
.gitignore (1)

112-112: LGTM!

The addition of rebuild.sh to .gitignore is appropriate. If this script is generated by or specific to direnv workflows, consider optionally updating the section comment to reflect that it covers both direnv config files and related development artifacts.

k8s-test/README.md (1)

21-43: Annotate fenced blocks with languages

markdownlint is flagging MD040 because these fences don’t declare a language. Tag them with something like bash (or text where appropriate) so the docs lint cleanly.

k8s-test/LOCAL_TESTING.md (2)

72-76: Add language identifier to fenced code block.

The fenced code block starting at line 72 is missing a language identifier, which affects syntax highlighting and rendering.

Apply this diff to add the language identifier:

-```
+```text
 Local test mode: using localhost:8080 for pod dynamo-test-worker-8080
 Fetching metadata from http://localhost:8080/metadata

As per static analysis hints

---

`208-208`: **Consider adding comma for clarity.**

The sentence could benefit from a comma after "helper script" for improved readability.



```diff
-**Problem:** You created a pod without using the helper script and the name doesn't end with a port number.
+**Problem:** You created a pod without using the helper script, and the name doesn't end with a port number.

As per static analysis hints

lib/llm/src/kv_router/subscriber.rs (2)

307-324: Consider logging Added events for observability.

The code silently ignores DiscoveryEvent::Added events. While the subscriber only needs to handle removals for cleanup, logging Added events would improve observability and help with debugging discovery issues.

Apply this diff to add logging for Added events:

                 Some(discovery_event_result) = instance_event_stream.next() => {
                     let Ok(discovery_event) = discovery_event_result else {
                         continue;
                     };

-                    let dynamo_runtime::discovery::DiscoveryEvent::Removed(worker_id) = discovery_event else {
+                    match discovery_event {
+                        dynamo_runtime::discovery::DiscoveryEvent::Added(instance_id) => {
+                            tracing::debug!(
+                                instance_id = instance_id,
+                                "DISCOVERY: Generate endpoint instance added"
+                            );
+                        }
+                        dynamo_runtime::discovery::DiscoveryEvent::Removed(worker_id) => {
+                            tracing::warn!(
+                                worker_id = worker_id,
+                                "DISCOVERY: Generate endpoint instance removed, removing worker"
+                            );
+
+                            tracing::warn!("DISCOVERY_VALIDATION: remove_worker_tx: worker_id={}", worker_id);
+                            if let Err(e) = remove_worker_tx.send(worker_id).await {
+                                tracing::warn!("Failed to send worker removal for worker {worker_id}: {e}");
+                            }
+                        }
+                    }
-                        continue;
-                    };
-
-                    tracing::warn!(
-                        worker_id = worker_id,
-                        "DISCOVERY: Generate endpoint instance removed, removing worker"
-                    );
-
-                    tracing::warn!("DISCOVERY_VALIDATION: remove_worker_tx: worker_id={}", worker_id);
-                    if let Err(e) = remove_worker_tx.send(worker_id).await {
-                        tracing::warn!("Failed to send worker removal for worker {worker_id}: {e}");
-                    }
                 }

321-321: Remove or downgrade DISCOVERY_VALIDATION debug logging.

The DISCOVERY_VALIDATION prefix suggests this is temporary debug/validation logging. Consider removing it or downgrading to trace level once the discovery implementation is stable.

-                    tracing::warn!("DISCOVERY_VALIDATION: remove_worker_tx: worker_id={}", worker_id);
+                    tracing::trace!("Sending worker removal: worker_id={}", worker_id);
k8s-test/run-tests.sh (1)

43-46: Consider adding cargo availability check.

The script assumes cargo is available but doesn't validate this before attempting to run tests. While set -e will catch the error, an explicit check with a helpful message would improve user experience.

Add this check after the kubectl validation:

 echo "✅ kubectl is configured"
 echo "   Cluster: $(kubectl config current-context)"
 echo ""
+
+# Check if cargo is available
+if ! command -v cargo &> /dev/null; then
+    echo "❌ cargo is not available"
+    echo "   Please ensure Rust and cargo are installed"
+    exit 1
+fi
k8s-test/create-local-test-pod.sh (1)

44-45: Consider including port in SERVICE_NAME to avoid conflicts.

The SERVICE_NAME is derived only from DYNAMO_COMPONENT, which could cause conflicts if the script is run multiple times with the same component but different ports. The pod name includes the port for uniqueness, but the service name does not.

Apply this diff to make the service name unique:

 POD_NAME="dynamo-test-worker-${PORT}"
-SERVICE_NAME="dynamo-test-${DYNAMO_COMPONENT}"
+SERVICE_NAME="dynamo-test-${DYNAMO_COMPONENT}-${PORT}"

This ensures each service is uniquely identified and prevents conflicts when running multiple local test servers for the same component on different ports.

lib/runtime/src/component/client.rs (1)

242-379: Release the cache lock before awaiting discovery

Line 257 keeps the instance_sources mutex held while awaiting list_and_watch. If that network call stalls (e.g. Kubernetes API lag), every other endpoint trying to resolve its instance source will block on the mutex. Please narrow the critical section to the cache check/insert and drop the guard before any awaits.

-        let instance_sources = drt.instance_sources();
-        let mut instance_sources = instance_sources.lock().await;
+        let instance_sources = drt.instance_sources();
+        let mut instance_sources_guard = instance_sources.lock().await;-        if let Some(instance_source) = instance_sources.get(endpoint) {
+        if let Some(instance_source) = instance_sources_guard.get(endpoint) {
             if let Some(instance_source) = instance_source.upgrade() {
                 tracing::debug!(
                     "get_or_create_dynamic_instance_source: Found cached instance source for endpoint: {}",
                     endpoint.path()
                 );
                 return Ok(instance_source);
             } else {
                 tracing::debug!(
                     "get_or_create_dynamic_instance_source: Cached instance source was dropped, removing for endpoint: {}",
                     endpoint.path()
                 );
-                instance_sources.remove(endpoint);
+                instance_sources_guard.remove(endpoint);
             }
         }
 
+        drop(instance_sources_guard);
+
         tracing::debug!(
             "get_or_create_dynamic_instance_source: Creating new instance source for endpoint: {}",
             endpoint.path()
         );
 
         let discovery_client = drt.discovery_client();
…
-        instance_sources.insert(endpoint.clone(), Arc::downgrade(&instance_source));
+        let mut instance_sources_guard = instance_sources.lock().await;
+        instance_sources_guard.insert(endpoint.clone(), Arc::downgrade(&instance_source));
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e1547e2 and bf62593.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • lib/bindings/python/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (38)
  • .gitignore (1 hunks)
  • components/src/dynamo/vllm/main.py (1 hunks)
  • k8s-test/LOCAL_TESTING.md (1 hunks)
  • k8s-test/README.md (1 hunks)
  • k8s-test/cleanup.sh (1 hunks)
  • k8s-test/create-local-test-pod.sh (1 hunks)
  • k8s-test/deploy.sh (1 hunks)
  • k8s-test/manifests/test-deployment.yaml (1 hunks)
  • k8s-test/run-tests.sh (1 hunks)
  • lib/llm/src/discovery/watcher.rs (7 hunks)
  • lib/llm/src/discovery/worker_monitor.rs (3 hunks)
  • lib/llm/src/entrypoint/input/common.rs (2 hunks)
  • lib/llm/src/entrypoint/input/grpc.rs (2 hunks)
  • lib/llm/src/entrypoint/input/http.rs (2 hunks)
  • lib/llm/src/http/service/clear_kv_blocks.rs (3 hunks)
  • lib/llm/src/http/service/health.rs (1 hunks)
  • lib/llm/src/http/service/service_v2.rs (4 hunks)
  • lib/llm/src/kv_router.rs (3 hunks)
  • lib/llm/src/kv_router/scheduler.rs (1 hunks)
  • lib/llm/src/kv_router/subscriber.rs (3 hunks)
  • lib/llm/src/local_model.rs (2 hunks)
  • lib/llm/tests/http_metrics.rs (3 hunks)
  • lib/runtime/Cargo.toml (2 hunks)
  • lib/runtime/src/component.rs (2 hunks)
  • lib/runtime/src/component/client.rs (4 hunks)
  • lib/runtime/src/component/endpoint.rs (2 hunks)
  • lib/runtime/src/discovery/kube.rs (1 hunks)
  • lib/runtime/src/discovery/kv_store.rs (1 hunks)
  • lib/runtime/src/discovery/mock.rs (6 hunks)
  • lib/runtime/src/discovery/mod.rs (5 hunks)
  • lib/runtime/src/discovery/utils.rs (1 hunks)
  • lib/runtime/src/distributed.rs (5 hunks)
  • lib/runtime/src/instances.rs (1 hunks)
  • lib/runtime/src/lib.rs (1 hunks)
  • lib/runtime/src/storage/key_value_store.rs (1 hunks)
  • lib/runtime/src/system_status_server.rs (4 hunks)
  • lib/runtime/tests/kube_client_integration.rs (1 hunks)
  • lib/runtime/tests/kube_discovery_integration.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (20)
📚 Learning: 2025-11-05T08:41:06.483Z
Learnt from: ryanolson
Repo: ai-dynamo/dynamo PR: 4070
File: lib/discovery/src/systems/etcd/peer.rs:151-188
Timestamp: 2025-11-05T08:41:06.483Z
Learning: In lib/discovery/src/systems/etcd/peer.rs, the register_instance method intentionally captures the lease_id before entering the OperationExecutor closure. If the lease is revoked or fails, the operation should hard-fail rather than retry with a new lease, because the system does not track which entries were registered under which lease. Retrying with a fresh lease would create inconsistent state.

Applied to files:

  • lib/runtime/src/instances.rs
  • lib/runtime/src/discovery/kv_store.rs
  • lib/runtime/src/component/endpoint.rs
  • lib/llm/src/discovery/worker_monitor.rs
  • lib/llm/src/kv_router/subscriber.rs
  • lib/llm/src/discovery/watcher.rs
  • lib/runtime/src/discovery/mock.rs
📚 Learning: 2025-10-14T00:58:05.744Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3597
File: lib/llm/src/kv_router/indexer.rs:437-441
Timestamp: 2025-10-14T00:58:05.744Z
Learning: In lib/llm/src/kv_router/indexer.rs, when a KvCacheEventData::Cleared event is received, the system intentionally clears all dp_ranks for the given worker_id by calling clear_all_blocks(worker.worker_id). This is the desired behavior and should not be scoped to individual dp_ranks.

Applied to files:

  • lib/llm/src/http/service/clear_kv_blocks.rs
  • lib/llm/src/kv_router/subscriber.rs
📚 Learning: 2025-09-02T16:46:54.015Z
Learnt from: GuanLuo
Repo: ai-dynamo/dynamo PR: 2714
File: lib/llm/src/discovery/model_entry.rs:38-42
Timestamp: 2025-09-02T16:46:54.015Z
Learning: In lib/llm/src/discovery/model_entry.rs, GuanLuo prefers not to add serde defaults for model_type and model_input fields to keep the specification explicit and avoid user errors, relying on atomic deployment strategy to avoid backward compatibility issues.

Applied to files:

  • lib/llm/src/local_model.rs
  • lib/llm/src/kv_router.rs
  • lib/runtime/src/lib.rs
  • lib/runtime/src/component/endpoint.rs
  • lib/llm/src/entrypoint/input/common.rs
  • lib/llm/src/entrypoint/input/grpc.rs
  • lib/llm/tests/http_metrics.rs
  • lib/llm/src/discovery/worker_monitor.rs
  • lib/llm/src/http/service/service_v2.rs
  • lib/llm/src/discovery/watcher.rs
  • lib/runtime/src/discovery/mock.rs
  • lib/runtime/src/discovery/mod.rs
  • lib/llm/src/entrypoint/input/http.rs
📚 Learning: 2025-08-21T17:23:02.836Z
Learnt from: michaelfeil
Repo: ai-dynamo/dynamo PR: 2591
File: lib/bindings/python/rust/http.rs:0-0
Timestamp: 2025-08-21T17:23:02.836Z
Learning: In lib/bindings/python/rust/http.rs, the enable_endpoint method uses EndpointType::all() to dynamically support all available endpoint types with case-insensitive matching, which is more maintainable than hardcoded match statements for endpoint type mappings.

Applied to files:

  • lib/llm/src/local_model.rs
📚 Learning: 2025-10-16T13:35:33.710Z
Learnt from: grahamking
Repo: ai-dynamo/dynamo PR: 3659
File: lib/llm/src/common/checked_file.rs:113-124
Timestamp: 2025-10-16T13:35:33.710Z
Learning: In the dynamo project, model deployment cards stored in etcd are cleared by lease expiration, so there's no persistence of old card data across system restarts or upgrades.

Applied to files:

  • lib/llm/src/local_model.rs
📚 Learning: 2025-08-15T23:51:04.958Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 2465
File: lib/runtime/src/utils/typed_prefix_watcher.rs:94-101
Timestamp: 2025-08-15T23:51:04.958Z
Learning: In the dynamo codebase's etcd client implementation, when using `kv_get_and_watch_prefix()` and `dissolve()`, the returned `events_rx` receiver maintains the etcd watch stream independently. The watcher handle can be safely dropped (using `_watcher`) without terminating the stream, as the receiver keeps the connection alive internally. This is a consistent pattern used throughout the codebase in multiple critical modules.

Applied to files:

  • lib/llm/src/kv_router.rs
  • lib/runtime/src/storage/key_value_store.rs
  • lib/llm/src/entrypoint/input/grpc.rs
  • lib/llm/src/kv_router/subscriber.rs
  • lib/llm/src/discovery/watcher.rs
  • lib/runtime/src/component/client.rs
  • lib/llm/src/entrypoint/input/http.rs
📚 Learning: 2025-08-15T23:51:04.958Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 2465
File: lib/runtime/src/utils/typed_prefix_watcher.rs:94-101
Timestamp: 2025-08-15T23:51:04.958Z
Learning: In the dynamo codebase's etcd client implementation, `PrefixWatcher` uses `#[derive(Dissolve)]` to generate a `dissolve()` method. The pattern `let (_, _watcher, mut events_rx) = prefix_watcher.dissolve();` is the standard and intended usage throughout the codebase. The `mpsc::Receiver<WatchEvent>` maintains the etcd watch stream independently, so the `Watcher` handle can be safely dropped. This pattern is used consistently in critical infrastructure modules like component/client.rs, utils/leader_worker_barrier.rs, and entrypoint/input/http.rs.

Applied to files:

  • lib/llm/src/kv_router.rs
  • lib/runtime/src/discovery/utils.rs
  • lib/runtime/src/storage/key_value_store.rs
  • lib/llm/src/entrypoint/input/common.rs
  • lib/llm/src/entrypoint/input/grpc.rs
  • lib/llm/tests/http_metrics.rs
  • lib/llm/src/discovery/worker_monitor.rs
  • lib/llm/src/kv_router/subscriber.rs
  • lib/llm/src/discovery/watcher.rs
  • lib/runtime/src/component/client.rs
  • lib/llm/src/entrypoint/input/http.rs
📚 Learning: 2025-08-15T23:51:04.958Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 2465
File: lib/runtime/src/utils/typed_prefix_watcher.rs:94-101
Timestamp: 2025-08-15T23:51:04.958Z
Learning: In the dynamo codebase's etcd client implementation, when using `kv_get_and_watch_prefix()` and `dissolve()`, the returned `events_rx` receiver maintains the etcd watch stream independently. The watcher handle can be safely dropped (using `_watcher`) without terminating the stream, as the receiver keeps the connection alive internally.

Applied to files:

  • lib/llm/src/kv_router.rs
  • lib/runtime/src/storage/key_value_store.rs
  • lib/llm/src/entrypoint/input/grpc.rs
  • lib/llm/src/kv_router/subscriber.rs
  • lib/llm/src/discovery/watcher.rs
  • lib/llm/src/entrypoint/input/http.rs
📚 Learning: 2025-09-17T01:00:50.937Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane identified that reordering tokio::select! arms in the indexer (moving dump_rx.recv() to be after event_rx.recv()) creates a natural barrier that ensures RouterEvents are always processed before dump requests, solving the ack-before-commit race condition. This leverages the existing biased directive and requires minimal code changes, aligning with their preference for contained solutions.

Applied to files:

  • lib/llm/src/kv_router.rs
  • lib/llm/src/kv_router/subscriber.rs
  • lib/llm/src/discovery/watcher.rs
  • lib/runtime/src/component/client.rs
📚 Learning: 2025-06-16T20:02:54.935Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1236
File: lib/llm/src/mocker/protocols.rs:85-112
Timestamp: 2025-06-16T20:02:54.935Z
Learning: When using derive_builder::Builder macro, the macro generates the builder struct and its methods, but does NOT generate a `builder()` method on the original struct. A manual `impl StructName { pub fn builder() -> StructNameBuilder { StructNameBuilder::default() } }` is required to provide the convenient `StructName::builder()` API pattern.

Applied to files:

  • lib/llm/src/kv_router.rs
📚 Learning: 2025-06-02T19:37:27.666Z
Learnt from: oandreeva-nv
Repo: ai-dynamo/dynamo PR: 1195
File: lib/llm/tests/block_manager.rs:150-152
Timestamp: 2025-06-02T19:37:27.666Z
Learning: In Rust/Tokio applications, when background tasks use channels for communication, dropping the sender automatically signals task termination when the receiver gets `None`. The `start_batching_publisher` function in `lib/llm/tests/block_manager.rs` demonstrates this pattern: when the `KVBMDynamoRuntimeComponent` is dropped, its `batch_tx` sender is dropped, causing `rx.recv()` to return `None`, which triggers cleanup and task termination.

Applied to files:

  • lib/runtime/src/storage/key_value_store.rs
  • lib/llm/src/discovery/watcher.rs
  • lib/llm/src/entrypoint/input/http.rs
📚 Learning: 2025-05-29T06:20:12.901Z
Learnt from: ryanolson
Repo: ai-dynamo/dynamo PR: 1093
File: lib/llm/src/block_manager/block/registry.rs:98-122
Timestamp: 2025-05-29T06:20:12.901Z
Learning: In lib/llm/src/block_manager/block/registry.rs, the background task spawned for handling unregister notifications uses detached concurrency by design. The JoinHandle is intentionally not stored as this represents a reasonable architectural tradeoff for a long-running cleanup task.

Applied to files:

  • lib/llm/tests/http_metrics.rs
📚 Learning: 2025-05-30T06:38:09.630Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1285
File: lib/llm/src/kv_router/scoring.rs:58-63
Timestamp: 2025-05-30T06:38:09.630Z
Learning: In lib/llm/src/kv_router/scoring.rs, the user prefers to keep the panic behavior when calculating load_avg and variance with empty endpoints rather than adding guards for division by zero. They want the code to fail fast on this error condition.

Applied to files:

  • lib/llm/src/discovery/worker_monitor.rs
📚 Learning: 2025-06-05T01:02:15.318Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1392
File: lib/llm/src/kv_router/scoring.rs:35-46
Timestamp: 2025-06-05T01:02:15.318Z
Learning: In lib/llm/src/kv_router/scoring.rs, PeaBrane prefers panic-based early failure over Result-based error handling for the worker_id() method to catch invalid data early during development.

Applied to files:

  • lib/llm/src/discovery/worker_monitor.rs
📚 Learning: 2025-06-06T21:48:35.214Z
Learnt from: biswapanda
Repo: ai-dynamo/dynamo PR: 1412
File: lib/bindings/python/src/dynamo/runtime/logging.py:100-100
Timestamp: 2025-06-06T21:48:35.214Z
Learning: In the Dynamo codebase, BentoML has been completely removed from all executable code, with only documentation and attribution references remaining. The error_loggers configuration in lib/bindings/python/src/dynamo/runtime/logging.py should not include "bentoml" since those modules no longer exist.

Applied to files:

  • components/src/dynamo/vllm/main.py
📚 Learning: 2025-09-17T20:55:06.333Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3095
File: lib/llm/src/kv_router/indexer.rs:0-0
Timestamp: 2025-09-17T20:55:06.333Z
Learning: When PeaBrane encounters a complex implementation issue that would significantly expand PR scope (like the remove_worker_sender method in lib/llm/src/kv_router/indexer.rs that required thread-safe map updates and proper shard targeting), they prefer to remove the problematic implementation entirely rather than rush a partial fix, deferring the proper solution to a future PR.

Applied to files:

  • lib/llm/src/kv_router/subscriber.rs
📚 Learning: 2025-09-21T01:40:52.456Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3155
File: components/backends/vllm/src/dynamo/vllm/main.py:228-233
Timestamp: 2025-09-21T01:40:52.456Z
Learning: In the dynamo codebase, error handling for distributed runtime client initialization (like runtime.namespace().component().endpoint().client()) is handled at the Rust level in the distributed runtime bindings, so Python-level try/catch blocks are not needed and would be redundant.

Applied to files:

  • lib/llm/src/kv_router/subscriber.rs
📚 Learning: 2025-07-16T12:41:12.543Z
Learnt from: grahamking
Repo: ai-dynamo/dynamo PR: 1962
File: lib/runtime/src/component/client.rs:270-273
Timestamp: 2025-07-16T12:41:12.543Z
Learning: In lib/runtime/src/component/client.rs, the current mutex usage in get_or_create_dynamic_instance_source is temporary while evaluating whether the mutex can be dropped entirely. The code currently has a race condition between try_lock and lock().await, but this is acknowledged as an interim state during the performance optimization process.

Applied to files:

  • lib/llm/src/kv_router/subscriber.rs
  • lib/runtime/src/component/client.rs
📚 Learning: 2025-09-17T01:00:50.937Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane suggested using tokio::select! arm ordering with the existing biased directive in the indexer to create a natural barrier for dump requests, ensuring KV events are drained before snapshotting. This approach leverages existing architecture (biased select) to solve race conditions with minimal code changes, which aligns with their preference for contained solutions.

Applied to files:

  • lib/runtime/src/component/client.rs
📚 Learning: 2025-10-07T20:32:27.578Z
Learnt from: keivenchang
Repo: ai-dynamo/dynamo PR: 3266
File: lib/llm/src/http/service/metrics.rs:833-854
Timestamp: 2025-10-07T20:32:27.578Z
Learning: In Axum 0.6+, routers with different state types can be merged successfully using .merge(). Each router maintains its own state internally, so a Router<Arc<MetricsHandlerState>> can be merged with a Router<Arc<service_v2::State>> without compilation issues.

Applied to files:

  • lib/runtime/src/system_status_server.rs
🧬 Code graph analysis (21)
lib/runtime/src/instances.rs (3)
lib/runtime/src/distributed.rs (1)
  • discovery_client (295-297)
lib/llm/src/http/service/service_v2.rs (1)
  • discovery_client (118-120)
lib/runtime/src/component/client.rs (1)
  • instances (96-101)
lib/llm/src/http/service/health.rs (1)
lib/runtime/src/instances.rs (1)
  • list_all_instances (15-42)
lib/llm/src/http/service/clear_kv_blocks.rs (2)
lib/runtime/src/distributed.rs (1)
  • discovery_client (295-297)
lib/llm/src/http/service/service_v2.rs (1)
  • discovery_client (118-120)
lib/llm/src/local_model.rs (3)
lib/llm/src/model_card.rs (2)
  • slug (272-274)
  • name (267-269)
lib/runtime/src/component.rs (8)
  • endpoint (270-278)
  • component (513-515)
  • component (667-673)
  • namespace (258-260)
  • namespace (676-683)
  • name (262-264)
  • name (509-511)
  • name (689-694)
lib/runtime/src/discovery/mod.rs (1)
  • from_model_card (83-99)
lib/llm/src/kv_router.rs (1)
lib/runtime/src/discovery/utils.rs (1)
  • watch_and_extract_field (41-106)
lib/runtime/src/discovery/utils.rs (2)
lib/runtime/src/discovery/mock.rs (3)
  • new (19-21)
  • new (32-43)
  • instance_id (94-96)
lib/runtime/src/discovery/mod.rs (2)
  • instance_id (152-157)
  • instance_id (192-192)
lib/runtime/src/discovery/kv_store.rs (2)
lib/runtime/src/storage/key_value_store.rs (10)
  • new (33-35)
  • new (74-76)
  • new (190-192)
  • new (503-516)
  • key (78-80)
  • key_str (82-84)
  • value (86-88)
  • from_raw (38-40)
  • entries (431-431)
  • memory (182-184)
lib/runtime/src/discovery/mod.rs (5)
  • instance_id (152-157)
  • instance_id (192-192)
  • register (195-195)
  • list (199-199)
  • list_and_watch (202-202)
lib/runtime/src/storage/key_value_store.rs (2)
lib/runtime/src/storage/key_value_store/mem.rs (2)
  • new (52-56)
  • new (60-70)
lib/runtime/src/transports/etcd.rs (2)
  • new (63-106)
  • new (489-532)
lib/runtime/src/component/endpoint.rs (2)
lib/runtime/src/distributed.rs (2)
  • discovery_client (295-297)
  • connection_id (280-282)
lib/runtime/src/component.rs (2)
  • endpoint (270-278)
  • subject (580-582)
lib/runtime/src/distributed.rs (3)
lib/llm/src/http/service/service_v2.rs (1)
  • discovery_client (118-120)
lib/runtime/src/system_status_server.rs (3)
  • discovery_metadata (81-85)
  • new (26-31)
  • new (65-73)
lib/runtime/src/discovery/kube.rs (2)
  • new (46-51)
  • new (173-206)
lib/llm/src/discovery/worker_monitor.rs (3)
lib/bindings/python/src/dynamo/_core.pyi (1)
  • ModelDeploymentCard (433-438)
lib/runtime/src/discovery/utils.rs (1)
  • watch_and_extract_field (41-106)
lib/llm/src/local_model.rs (1)
  • card (329-331)
lib/runtime/tests/kube_client_integration.rs (1)
lib/runtime/src/discovery/kube.rs (2)
  • new_for_testing (214-245)
  • instance_id (615-617)
lib/llm/src/http/service/service_v2.rs (2)
lib/runtime/src/distributed.rs (3)
  • discovery_client (295-297)
  • new (50-259)
  • store (336-338)
lib/bindings/python/src/dynamo/_core.pyi (1)
  • CancellationToken (67-78)
lib/runtime/src/discovery/kube.rs (3)
lib/runtime/src/distributed.rs (5)
  • runtime (272-274)
  • new (50-259)
  • from_settings (261-264)
  • from_settings (361-367)
  • store (336-338)
lib/runtime/src/system_status_server.rs (3)
  • new (26-31)
  • new (65-73)
  • port (41-43)
lib/runtime/src/discovery/mod.rs (5)
  • instance_id (152-157)
  • instance_id (192-192)
  • register (195-195)
  • list (199-199)
  • list_and_watch (202-202)
lib/runtime/src/component.rs (2)
lib/runtime/src/distributed.rs (1)
  • discovery_client (295-297)
lib/runtime/src/component/client.rs (1)
  • instances (96-101)
lib/runtime/tests/kube_discovery_integration.rs (1)
lib/runtime/src/discovery/kube.rs (4)
  • default (102-104)
  • new (46-51)
  • new (173-206)
  • instance_id (615-617)
lib/llm/src/kv_router/subscriber.rs (3)
lib/runtime/src/distributed.rs (1)
  • discovery_client (295-297)
lib/llm/src/http/service/service_v2.rs (1)
  • discovery_client (118-120)
lib/llm/src/kv_router/scoring.rs (1)
  • worker_id (26-37)
lib/runtime/src/component/client.rs (3)
lib/runtime/src/component.rs (9)
  • path (249-251)
  • path (518-525)
  • client (593-599)
  • new (658-664)
  • drt (203-205)
  • drt (467-469)
  • drt (630-632)
  • component (513-515)
  • component (667-673)
lib/runtime/src/distributed.rs (3)
  • new (50-259)
  • instance_sources (348-350)
  • discovery_client (295-297)
lib/runtime/src/runtime.rs (1)
  • secondary (248-250)
lib/runtime/src/discovery/mock.rs (1)
lib/runtime/src/discovery/mod.rs (3)
  • list (199-199)
  • instance_id (152-157)
  • instance_id (192-192)
lib/runtime/src/discovery/mod.rs (4)
lib/runtime/src/discovery/kube.rs (4)
  • hash_pod_name (21-26)
  • instance_id (615-617)
  • list (662-742)
  • list_and_watch (744-956)
lib/runtime/src/discovery/utils.rs (1)
  • watch_and_extract_field (41-106)
lib/runtime/src/discovery/kv_store.rs (3)
  • instance_id (80-82)
  • list (163-193)
  • list_and_watch (195-337)
lib/runtime/src/discovery/mock.rs (3)
  • instance_id (94-96)
  • list (110-117)
  • list_and_watch (119-166)
lib/runtime/src/system_status_server.rs (2)
lib/runtime/src/discovery/kube.rs (2)
  • new (46-51)
  • new (173-206)
lib/runtime/src/distributed.rs (1)
  • new (50-259)
🪛 Checkov (3.2.334)
k8s-test/manifests/test-deployment.yaml

[medium] 3-45: Containers should not run with allowPrivilegeEscalation

(CKV_K8S_20)


[medium] 3-45: Minimize the admission of root containers

(CKV_K8S_23)

🪛 LanguageTool
k8s-test/LOCAL_TESTING.md

[uncategorized] ~208-~208: Possible missing comma found.
Context: ... created a pod without using the helper script and the name doesn't end with a port nu...

(AI_HYDRA_LEO_MISSING_COMMA)

k8s-test/README.md

[uncategorized] ~193-~193: Possible missing comma found.
Context: ..._endpointsTest thelist_and_watch()` method which creates a streaming watch. **Wha...

(AI_HYDRA_LEO_MISSING_COMMA)

🪛 markdownlint-cli2 (0.18.1)
k8s-test/LOCAL_TESTING.md

72-72: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

k8s-test/README.md

160-160: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


174-174: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


201-201: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


232-232: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


248-248: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


268-268: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


285-285: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


304-304: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


326-326: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


344-344: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🔇 Additional comments (3)
k8s-test/run-tests.sh (1)

1-4: LGTM!

The script header with strict error handling is appropriate for this test orchestration script.

k8s-test/create-local-test-pod.sh (2)

62-109: LGTM! Well-structured Kubernetes manifests.

The Pod and Service manifests are correctly structured with appropriate labels, readiness probes, and networking configuration for local testing.


123-157: Clear and comprehensive usage instructions.

The instructions provide excellent guidance for users setting up local testing, including all necessary environment variables and cleanup commands.


import uvloop
from kvbm.vllm_integration.consolidator_config import get_consolidator_endpoints
# from kvbm.vllm_integration.consolidator_config import get_consolidator_endpoints
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Commented import causes runtime error when kvbm connector is enabled.

The import of get_consolidator_endpoints is commented out (Line 11), but the function is still invoked at line 219. This will raise a NameError at runtime if config.has_connector("kvbm") returns True.

Resolution options:

  1. If kvbm support is still needed, uncomment the import:
-# from kvbm.vllm_integration.consolidator_config import get_consolidator_endpoints
+from kvbm.vllm_integration.consolidator_config import get_consolidator_endpoints
  1. If kvbm is being deprecated as part of the Kubernetes migration, remove the dead code:
     # Set up consolidator endpoints if KVBM is enabled
     consolidator_endpoints = None
-    if config.has_connector("kvbm"):
-        consolidator_endpoints = get_consolidator_endpoints(vllm_config)
-    vllm_config.consolidator_endpoints = consolidator_endpoints
+    vllm_config.consolidator_endpoints = None

Also applies to: 216-220

🤖 Prompt for AI Agents
In components/src/dynamo/vllm/main.py around lines 11 and 216-220, the import of
get_consolidator_endpoints is commented out but the function is invoked later
when config.has_connector("kvbm") is true, causing a NameError; fix by either
uncommenting and restoring the import at line 11 if kvbm support is required, or
remove the kvbm-specific invocation and related dead code (lines ~216-220) if
kvbm is deprecated, and ensure config.has_connector("kvbm") branches are
consistent with the chosen approach.

Comment on lines +99 to +102
selector:
app: dynamo-local-test
dynamo.nvidia.com/namespace: "$DYNAMO_NAMESPACE"
dynamo.nvidia.com/component: "$DYNAMO_COMPONENT"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Service selector may be too restrictive for multiple pods.

The service selector matches on app, dynamo.nvidia.com/namespace, and dynamo.nvidia.com/component. If you create multiple pods for the same component (e.g., multiple backend instances on different ports), they would all be selected by the same service. This conflicts with the local testing model where each pod should route to a different localhost port.

Given the recommendation to change SERVICE_NAME to include the port, you should also update the selector to be more specific. Consider adding the local-test-port label to the selector:

   selector:
     app: dynamo-local-test
+    local-test-port: "$PORT"
     dynamo.nvidia.com/namespace: "$DYNAMO_NAMESPACE"
     dynamo.nvidia.com/component: "$DYNAMO_COMPONENT"

This ensures each service only routes to its corresponding pod.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
selector:
app: dynamo-local-test
dynamo.nvidia.com/namespace: "$DYNAMO_NAMESPACE"
dynamo.nvidia.com/component: "$DYNAMO_COMPONENT"
selector:
app: dynamo-local-test
local-test-port: "$PORT"
dynamo.nvidia.com/namespace: "$DYNAMO_NAMESPACE"
dynamo.nvidia.com/component: "$DYNAMO_COMPONENT"
🤖 Prompt for AI Agents
In k8s-test/create-local-test-pod.sh around lines 99 to 102, the Service
selector is too broad (matching app, dynamo.nvidia.com/namespace, and
dynamo.nvidia.com/component) which will select multiple pods and break per-port
local testing; update the Service selector to include the per-instance label
(e.g., local-test-port: "<port>") so the Service only targets the pod for that
port, and ensure the pod manifest/labels and the script that sets
SERVICE_NAME/port add the matching local-test-port label/value.

Comment on lines +25 to +37
echo "🔍 Checking for test resources in namespace: $NAMESPACE"

# Check if test resources are deployed
PODS=$(kubectl get pods -l app=dynamo-test --namespace="$NAMESPACE" --no-headers 2>/dev/null | wc -l)
if [ "$PODS" -eq 0 ]; then
echo "⚠️ Test resources not deployed in namespace: $NAMESPACE"
echo " Run ./deploy.sh $NAMESPACE to create test resources"
echo " (Tests will still run but may not find any endpoints)"
echo ""
else
echo "✅ Found $PODS test pods in namespace: $NAMESPACE"
echo ""
fi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify that tests can discover resources despite namespace mismatch.

The script checks for resources in the provided NAMESPACE (default: "default") but notes on line 91 that "the actual KubeDiscoveryClient namespace is determined by POD_NAMESPACE env var in test code." This could lead to confusion if a user provides a namespace argument but the tests look elsewhere.

Consider one of these approaches:

  1. Pass the NAMESPACE argument to tests via POD_NAMESPACE environment variable
  2. Remove the NAMESPACE argument and document that POD_NAMESPACE must be set
  3. Make the warning more prominent when namespace doesn't match

🏁 Script executed:

#!/bin/bash
# Verify if the test code respects a POD_NAMESPACE environment variable
# or if it has a hardcoded namespace

rg -n "POD_NAMESPACE" lib/runtime/tests/

Length of output: 0


I'll conduct broader searches to verify the review concern more thoroughly.


---

🏁 Script executed:

```shell
#!/bin/bash
# Find the run-tests.sh file and examine its content
fd -name "run-tests.sh" -type f

Length of output: 230


🏁 Script executed:

#!/bin/bash
# Search for POD_NAMESPACE references across the entire codebase
rg -n "POD_NAMESPACE" --type-list | head -20

Length of output: 389


🏁 Script executed:

#!/bin/bash
# Search for POD_NAMESPACE usage without path restrictions
rg -n "POD_NAMESPACE" -i

Length of output: 2757


🏁 Script executed:

#!/bin/bash
# Look for namespace-related environment variables in test or runtime code
rg -n "NAMESPACE" --type=sh -C3 | head -50

Length of output: 2590


I need to see how tests are actually executed in the script to complete verification.


---

🏁 Script executed:

```shell
#!/bin/bash
# Examine the full run-tests.sh to understand test execution
cat k8s-test/run-tests.sh

Length of output: 3497


Pass NAMESPACE argument to tests via POD_NAMESPACE environment variable.

The NAMESPACE argument (line 23) is only used to check resource existence; it's never passed to the actual test execution. Tests read POD_NAMESPACE from the environment (lib/runtime/src/discovery/kube.rs:128), which defaults to "default" if unset. This creates a mismatch: a user providing ./run-tests.sh client "" my-namespace still runs tests against POD_NAMESPACE (not my-namespace).

Add before each cargo test invocation:

export POD_NAMESPACE="$NAMESPACE"

This aligns test behavior with the user's namespace argument and eliminates confusion. The warning on line 91 can then be removed since the script guarantees namespace consistency.

🤖 Prompt for AI Agents
In k8s-test/run-tests.sh around lines 25 to 37, the script only uses the
NAMESPACE argument to check for deployed pods but never exports it for the
tests; this causes tests to use POD_NAMESPACE's default instead of the provided
NAMESPACE. Fix by exporting POD_NAMESPACE="$NAMESPACE" before any cargo test
invocations (add an export line before each cargo test call or once globally
before running tests) so the test process reads the intended namespace; remove
the now-unnecessary warning about namespace mismatch.

Comment on lines +187 to +192
let instances_filtered: Vec<dynamo_runtime::component::Instance> = discovery_instances
.into_iter()
.filter(|instance| instance.endpoint == CLEAR_KV_ENDPOINT)
.collect::<Vec<_>>();

if instances_filtered.is_empty() {
let found_endpoints: Vec<String> = instances
.iter()
.map(|instance| instance.endpoint.clone())
.collect();
add_worker_result(
false,
entry_name,
&format!(
"Worker group doesn't support clear_kv_blocks. Supported endpoints: {}",
found_endpoints.join(", ")
),
namespace,
component,
None,
);
continue;
}
.map(|di| match di {
dynamo_runtime::discovery::DiscoveryInstance::Endpoint(instance) => instance,
})
.collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Handle all DiscoveryInstance variants to prevent panic.

The pattern match only handles DiscoveryInstance::Endpoint but doesn't account for other possible variants (such as ModelCard). If discovery_client.list() returns a non-Endpoint variant, this code will panic.

Apply this diff to handle all variants gracefully:

 let instances_filtered: Vec<dynamo_runtime::component::Instance> = discovery_instances
     .into_iter()
-    .map(|di| match di {
-        dynamo_runtime::discovery::DiscoveryInstance::Endpoint(instance) => instance,
-    })
+    .filter_map(|di| match di {
+        dynamo_runtime::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
+        _ => {
+            tracing::warn!("Unexpected discovery instance variant for clear_kv_blocks endpoint");
+            None
+        }
+    })
     .collect();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let instances_filtered: Vec<dynamo_runtime::component::Instance> = discovery_instances
.into_iter()
.filter(|instance| instance.endpoint == CLEAR_KV_ENDPOINT)
.collect::<Vec<_>>();
if instances_filtered.is_empty() {
let found_endpoints: Vec<String> = instances
.iter()
.map(|instance| instance.endpoint.clone())
.collect();
add_worker_result(
false,
entry_name,
&format!(
"Worker group doesn't support clear_kv_blocks. Supported endpoints: {}",
found_endpoints.join(", ")
),
namespace,
component,
None,
);
continue;
}
.map(|di| match di {
dynamo_runtime::discovery::DiscoveryInstance::Endpoint(instance) => instance,
})
.collect();
let instances_filtered: Vec<dynamo_runtime::component::Instance> = discovery_instances
.into_iter()
.filter_map(|di| match di {
dynamo_runtime::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
_ => {
tracing::warn!("Unexpected discovery instance variant for clear_kv_blocks endpoint");
None
}
})
.collect();
🤖 Prompt for AI Agents
In lib/llm/src/http/service/clear_kv_blocks.rs around lines 187 to 192, the
current map only handles DiscoveryInstance::Endpoint and will panic on other
variants; change the logic to handle all variants by using a filter_map (or
match that returns Option) that extracts the Endpoint instance and skips
non-Endpoint variants (optionally logging or counting skipped variants) so the
code never panics if discovery_client.list() returns ModelCard or other
variants.

Comment on lines +77 to 92
// Initialize discovery client backed by KV store
// Create a cancellation token for the discovery client's watch streams
let discovery_client = {
let cancel_token = CancellationToken::new();
Arc::new(KVStoreDiscoveryClient::new(
store.clone(),
cancel_token,
)) as Arc<dyn DiscoveryClient>
};

Self {
manager,
metrics: Arc::new(Metrics::default()),
store,
discovery_client,
flags: StateFlags {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't hardcode a KV discovery client here

By constructing a KVStoreDiscoveryClient unconditionally, this service ignores the Kubernetes discovery backend that the rest of the runtime now supports. When DYN_DISCOVERY_BACKEND=kubernetes (the main scenario this PR enables), the KV store remains empty, so the HTTP service never sees any endpoints or model cards—client-facing APIs will report an empty model list even though pods are registered. Please source the discovery client from the same backend selection logic as DistributedRuntime (e.g., accept an Arc<dyn DiscoveryClient> from the builder or share the runtime’s already-initialized client) instead of hardcoding the KV implementation.

🤖 Prompt for AI Agents
In lib/llm/src/http/service/service_v2.rs around lines 77 to 92, the code
unconditionally constructs a KVStoreDiscoveryClient which bypasses the
Kubernetes discovery backend; instead of hardcoding KV, accept or reuse an
Arc<dyn DiscoveryClient> from the builder or from the already-initialized
DistributedRuntime and assign that to discovery_client. Modify the service
constructor to take an Arc<dyn DiscoveryClient> parameter (or a reference to the
runtime that exposes it), remove the KVStoreDiscoveryClient::new() call here,
and wire the provided discovery client into Self so the HTTP service respects
DYN_DISCOVERY_BACKEND selection. Ensure cancellation token handling remains
compatible with the provided client (or move token creation to the provider) and
update callers/tests to pass the appropriate DiscoveryClient.

Comment on lines +418 to +432
let spec = DiscoverySpec::from_model_card(
endpoint.component().namespace().name().to_string(),
endpoint.component().name().to_string(),
endpoint.name().to_string(),
&self.card,
)?;
let _instance = discovery.register(spec).await?;

tracing::warn!(
"DISCOVERY_VALIDATION: model_card_registered: namespace={}, component={}, endpoint={}, model_name={}",
endpoint.component().namespace().name(),
endpoint.component().name(),
endpoint.name(),
self.card.name()
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Registration handle is dropped immediately
DiscoveryClient::register returns a handle (DiscoveryInstance) whose Drop un-registers the spec. Assigning it to _instance and letting it go out of scope means the model is unregistered as soon as attach returns, so ingress will never see it. Please retain the handle for as long as the model should stay registered (e.g., store it on LocalModel and release it on detach/shutdown).

🤖 Prompt for AI Agents
In lib/llm/src/local_model.rs around lines 418 to 432, the DiscoveryInstance
returned by discovery.register is assigned to a temporary _instance and
immediately dropped, which causes the registration to be removed; retain the
handle on the LocalModel so the registration stays active by adding a field
(e.g., discovery_instance: Option<DiscoveryInstance>) to the LocalModel struct,
store the returned handle in that field in attach, and clear/drop it explicitly
in detach/shutdown to unregister when the model should actually be removed.

Comment on lines +305 to +326
// Fast path: check cache
{
let cache = self.metadata_cache.read().await;
if let Some(cached) = cache.get(&instance_id) {
tracing::debug!(
"Cache hit for pod_name={}, instance_id={:x}",
pod_name,
instance_id
);
return Ok(cached.metadata.clone());
}
}

// Cache miss: fetch from remote pod
tracing::debug!(
"Cache miss for pod_name={}, instance_id={:x}, fetching from {}",
pod_name,
instance_id,
target_host
);
self.fetch_and_cache_from_host(instance_id, pod_name, &target_host).await
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Cache must refresh to avoid permanently stale metadata.

get_metadata returns early on any cache hit, and the cache is only cleared when a pod disappears. If a pod adds or updates endpoints/model cards while staying ready, every other node will keep serving the old metadata forever (the fetched_at timestamp is never consulted). list and list_and_watch both rely on this path, so discovery data goes stale cluster-wide. Please add an expiry/version check (e.g., TTL based on fetched_at, ETag, or ResourceVersion) or always refetch when the EndpointSlice reports a change so metadata stays current.

Comment on lines +843 to +948
// Extract ALL current instances from ALL slices
let current_instances: HashSet<u64> = all_slices.iter()
.flat_map(Self::extract_instance_ids)
.collect();

// Build endpoint info map for fetching
let mut endpoint_info_map = HashMap::new();
for slice in &all_slices {
let endpoint_infos = Self::extract_endpoint_info(slice);
for (instance_id, pod_name, pod_ip) in endpoint_infos {
endpoint_info_map.entry(instance_id)
.or_insert((pod_name, pod_ip));
}
}

// Diff against previous state
let prev_instances = known_instances.read().await.clone();
let added: Vec<_> = current_instances.difference(&prev_instances).copied().collect();
let removed: Vec<_> = prev_instances.difference(&current_instances).copied().collect();

if !added.is_empty() || !removed.is_empty() {
tracing::debug!(
stream_id = %stream_id,
added = added.len(),
removed = removed.len(),
total = current_instances.len(),
"State diff computed"
);
}

// Update known_instances before fetching
*known_instances.write().await = current_instances.clone();

// Fetch metadata for new instances concurrently
let fetch_futures: Vec<_> = added.iter().filter_map(|&instance_id| {
endpoint_info_map.get(&instance_id).map(|(pod_name, pod_ip)| {
let client = client.clone();
let pod_name = pod_name.clone();
let pod_ip = pod_ip.clone();
let key_clone = key_clone.clone();
let known_instances = known_instances.clone();

async move {
match client.get_metadata(&pod_name, &pod_ip).await {
Ok(metadata) => {
// Fetch-after-delete guard: check if still in known set
if known_instances.read().await.contains(&instance_id) {
let instances = Self::filter_metadata(&metadata, &key_clone, instance_id);
Some((instance_id, instances))
} else {
tracing::debug!(
stream_id = %stream_id,
instance_id = format!("{:x}", instance_id),
"Instance removed before fetch completed, skipping"
);
None
}
}
Err(e) => {
tracing::warn!(
stream_id = %stream_id,
pod_name = %pod_name,
error = %e,
"Failed to fetch metadata"
);
None
}
}
}
})
}).collect();

// Fetch concurrently and emit Added events
let results: Vec<_> = futures::stream::iter(fetch_futures)
.buffer_unordered(20)
.collect()
.await;

for result in results {
if let Some((_instance_id, instances)) = result {
for instance in instances {
tracing::info!(
stream_id = %stream_id,
instance_id = format!("{:x}", instance.instance_id()),
"Emitting Added event"
);
if tx.send(Ok(DiscoveryEvent::Added(instance))).is_err() {
tracing::debug!(stream_id = %stream_id, "Receiver dropped, stopping monitor");
return;
}
}
}
}

// Emit Removed events
for instance_id in removed {
tracing::info!(
stream_id = %stream_id,
instance_id = format!("{:x}", instance_id),
"Emitting Removed event"
);
client.invalidate_cache(instance_id).await;
if tx.send(Ok(DiscoveryEvent::Removed(instance_id))).is_err() {
tracing::debug!(stream_id = %stream_id, "Receiver dropped, stopping monitor");
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Watch loop never emits updates for pods whose metadata changes.

Here the diff only tracks instance_ids (pod hashes). When a pod updates its advertised endpoints/model cards but remains in the EndpointSlice, it is neither in added nor removed, so you never refetch its metadata and no Added/Removed events fire. As a result, downstream consumers never learn about the change. Please trigger a refresh when an EndpointSlice changes (e.g., compare ResourceVersions/per-address hashes, or refetch on every Applied event) so metadata updates propagate even when the pod itself stays online.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants